Skip to content

Conversation

DiannaHohensee
Copy link
Contributor

@DiannaHohensee DiannaHohensee commented Jul 17, 2025

Adds a new transport action to collect usage stats from the
data nodes. ClusterInfoService uses the action to pull thread
pool usage information from the data nodes to the master node
periodically.

Also removes NodeUsageStatsForThreadPoolsCollector as
a plugin interface and replaces it with a single class
implementation. We no longer need a serverless collector
implementation, it can all be done in stateful.

Closes ES-12316

Adds a new transport action to collect usage stats from the
data nodes. ClusterInfoService uses the action to pull thread
pool usage information from the data nodes to the master node
periodically.

Also removes NodeUsageStatsForThreadPoolsCollector as
an interface/plugin and replaces it with a single
class implementation.

Closes ES-12316
@DiannaHohensee DiannaHohensee self-assigned this Jul 17, 2025
@DiannaHohensee DiannaHohensee requested a review from a team as a code owner July 17, 2025 21:11
@DiannaHohensee DiannaHohensee added :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed Coordination Meta label for Distributed Coordination team v9.2.0 labels Jul 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

/**
* Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}.
*/
public class NodeUsageStatsForThreadPoolsAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a TransportNodesStatsAction which can produce thread-pool usage. Do we need a separate action for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to collect the thread pool stats are destructive. For example, collecting the max queue latency seen since the last call and then resetting max seen to zero. Pool utilization is also destructive, resetting an execution time tracker after collection. So we can't hook the new stats up to the TransportNodesStatsAction API and have random callers clearing the state we'll need for allocation.

Copy link
Contributor

@nicktindall nicktindall Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're building a whole separate set of polling messages just because of the destructive-ness of the utilisation read, can we instead just limit the re-calculation to no more often than 30s or something (returning the most recent calculated value until that interval has passed) and have all readers poll from a single monitor? Or instead just switch to having a separate dedicated utilisation polling task on the data node, and make the reads non-destructive.

That sounds like a tidier alternative to me. We can discuss tomorrow at the catch up, but it seems to me like having separate monitoring state is now cascading into more work/maintenance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead just limit the re-calculation to no more often than 30s or something (returning the most recent calculated value until that interval has passed) and have all readers poll from a single monitor?

If we make it time based, recalculated on the data node every 30 seconds, and then the master node polls every 30 seconds, operations can race such that the master node sees the same value twice and misses a value that is just about to be calculated.

Or instead just switch to having a separate dedicated utilisation polling task on the data node, and make the reads non-destructive.

For this we'd have to build a component on the data node to asynchronously run a thread every 30 seconds to calculate a new value. Not obviously easier than building a TransportAction (which is also already implemented). Transport actions are apparently lightweight and there isn't concern about adding more, FWIW.

There was a discussion on this subject back in the 07-01 meeting (David, Pooya and I) (minutes 10-32). We were thinking EWMA at the time. We did like the idea of using the node stats api. The concern became public documentation of the new values because we were uncertain whether we'd want to change what metrics were sent in future. We can't change publicly visible stats. Arguably the stats we have now are easier to explain, though I'm not sure they're final yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new action is more about having a separate action, request/response that we can play around with and include things into that may never go to node-stats. There is also not too much overhead to a new action so I prefer to have it as such. One can argue that node-stats should maybe not be used from cluster-info - rather we should have a dedicated action - but let us not go there now.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Left a few comments, but no need for another round.

public static class Request extends BaseNodesRequest {
public Request() {
// Send all nodes a request by specifying null.
super((String[]) null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to send this to indexing nodes only, so might be worth adding the option to specify the nodes here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this to take a list of nodes, and limited the given nodes to data nodes, since this is stateful and doesn't know about index nodes.

public Map<String, NodeUsageStatsForThreadPools> getAllNodeUsageStatsForThreadPools() {
Map<String, NodeUsageStatsForThreadPools> allNodeUsageStatsForThreadPools = new HashMap<>();
for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) {
// NOMERGE: Is the nodeID in NodeUsageStatsForThreadPools redundant? What is it useful for? If not, remove?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you want to fix this before merge? I am OK to keep it in the map and in the object, that is quite common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'll remove it. Let's see how this plays out, thanks for giving it a think.

/**
* Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}.
*/
public class NodeUsageStatsForThreadPoolsAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new action is more about having a separate action, request/response that we can play around with and include things into that may never go to node-stats. There is also not too much overhead to a new action so I prefer to have it as such. One can argue that node-stats should maybe not be used from cluster-info - rather we should have a dedicated action - but let us not go there now.

if (clusterState.getMinTransportVersion().onOrAfter(TransportVersions.TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) {
client.execute(
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
new NodeUsageStatsForThreadPoolsAction.Request(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only want to direct this to data nodes or even indexing nodes? No need to contact master or coordination nodes nor search only nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wandered down this rabbit hole initially, wondering how to collect a list of only index nodes. The problem is that this code is not in serverless, this is in the main repo: we don't have search vs index nodes :) I was thinking in stateful we would want to contact all the nodes. Though now that I think further, I should be able to filter for data role nodes -- at least not send requests to master role only nodes and such. I'll look into that now.

Down the line, we should have thread pool stats from search nodes for some kind of SearchLoadDecider. In the meantime, all I can think to achieve index-only is to follow up with some kind of override plug for serverless to specially filter the nodes sent here. Or else make all the write load decider logic serverless only, which seems like a shame to do based only on this detail.

assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be good to add a test where we block write-threads to ensure we have a queue latency here. We can do that in a follow-up, this is not essential before merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I flagged it on my JIRA ticket so I don't forget 👍

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I've reduced the nodes sent requests from all to only data nodes.

public static class Request extends BaseNodesRequest {
public Request() {
// Send all nodes a request by specifying null.
super((String[]) null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this to take a list of nodes, and limited the given nodes to data nodes, since this is stateful and doesn't know about index nodes.

@DiannaHohensee DiannaHohensee added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Aug 1, 2025
@elasticsearchmachine elasticsearchmachine merged commit afc2051 into elastic:main Aug 1, 2025
33 checks passed
@DiannaHohensee DiannaHohensee deleted the 2025/07/17/new-allocation-transport-action branch August 1, 2025 19:24
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 1, 2025
…cking

* upstream/main: (166 commits)
  Reduce inactive sink interval in VectorSimilarityFunctionsIT (elastic#132288)
  ESQL: Allow agg tests to process many columns (elastic#132358)
  Update analysis-lowercase-tokenfilter.md (elastic#132359)
  Add Sparse Vector Index Options Settings to Semantic Text Field (elastic#131058)
  Collect node thread pool usage for shard balancing (elastic#131480)
  Add tasks to validate new style transport versions (elastic#131782)
  Mute org.elasticsearch.search.routing.SearchReplicaSelectionIT testNodeSelection elastic#132354
  Mute org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT testBadAsyncId elastic#132353
  Fixes DenseVectorFieldIndexTypeUpdateIT release tests (elastic#132346)
  Fix testCloseOrReallocateDuringPartialSnapshot (elastic#132049)
  (Doc) ILM Force Merge not on HDD and happens on hosting node not current phase tier (elastic#130280)
  Run GeoIp YAML tests in multi-project cluster and fix bug discovered by tests (elastic#131521)
  Unmutes elastic#132111, seems a transient, non reproducible issue (elastic#132253)
  Mute org.elasticsearch.search.suggest.phrase.PhraseSuggesterIT testPhraseSuggestionWithNgramOnlyAnalyzerThrowsException elastic#132347
  Add AI21 support to Inference Plugin (elastic#131238)
  OpenJDK EA builds should use https instead of http (elastic#132297)
  ESQL: Normalize timeseries aggs slightly (elastic#132284)
  Avoid internal server error on suggester ngram bad request (elastic#132321)
  [ES|QL] Rerank operator improvements (elastic#132318)
  Mute org.elasticsearch.xpack.logsdb.qa.LogsDbVersusReindexedLogsDbChallengeRestIT testTermsQuery elastic#132337
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >non-issue Team:Distributed Coordination Meta label for Distributed Coordination team v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants